{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# 3 - Apache Spark ML- Create machine learning models\n",
    "\n",
    "In this chapter, you will:\n",
    "\n",
    "• Build your first ML model with Spark ML\n",
    "\n",
    "• Learn more Spark functionality and how to use it"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession \n",
    "\n",
    "\n",
    "spark = SparkSession.builder \\\n",
    "    .master('local[*]') \\\n",
    "    .appName(\"ApacheSparkML\") \\\n",
    "    .getOrCreate()\n",
    "\n",
    "df_train = spark.read.parquet(\"classified_train_data\")\n",
    "df_test = spark.read.parquet(\"classified_test_data\")\n",
    "\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ Task :\n",
    "\n",
    "### Frequent Patterns Growth algorithm - FPGrowth()\n",
    "\n",
    "Your data is clean and organized. It's your turn to create your **first** ML model with Spark.\n",
    "\n",
    "Run the next code; it runs the **Frequent Patterns Growth** algorithm to extract patterns if those exist.\n",
    "\n",
    "Tweak the minSupport and minCondifence.\n",
    "\n",
    "\n",
    "<details><summary>Hint</summary>\n",
    "<p>\n",
    "\n",
    "Change the minSupport and minCondifence to 0.1 and see what happens.\n",
    "\n",
    "</p>\n",
    "</details>\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.fpm import FPGrowth\n",
    "\n",
    "fpGrowth = FPGrowth(itemsCol=\"description\", minSupport=0.5, minConfidence=0.9)\n",
    "fpGrowth_model = fpGrowth.fit(df_train)\n",
    "\n",
    "# Display frequent itemsets.\n",
    "fpGrowth_model.freqItemsets.show()\n",
    "\n",
    "# Display generated association rules.\n",
    "fpGrowth_model.associationRules.show()\n",
    "\n",
    "# transform examines the input items against all the association rules and summarize the\n",
    "# consequents as prediction\n",
    "fpGrowth_model.transform(df_train).toPandas().transpose()\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "What did you get?\n",
    "\n",
    "\n",
    "\n",
    "<details><summary>Check me after running the previous code multipale times</summary>\n",
    "<p>\n",
    "\n",
    "When tweaking the minSupport=0.21 and minConfidence=0.1\n",
    "\n",
    "You get:\n",
    "\n",
    "|items|freq|\n",
    "|----|:----:|\n",
    "|[and]| 389|\n",
    "\n",
    "\n",
    "</p>\n",
    "</details>\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ **Task :**\n",
    "\n",
    "### LinearRegression functionality\n",
    "\n",
    "It's your turn to create your **second** ML model with Spark - Linear Regression.\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "<details><summary>What is linear regression used for?</summary>\n",
    "<p>\n",
    "Linear regression is a common Statistical Data Analysis technique. It is used to determine the extent to which there is a linear relationship between a dependent variable and one or more independent variables.\n",
    "</p>\n",
    "</details>\n",
    "\n",
    "\n",
    "**But**, before jumping right into it, you should know:\n",
    "\n",
    "Spark ML Linear Regression **input** is:\n",
    "1. `label` of type Double - our classification\n",
    "2. `features` of type - `Vector[Double]` - Vector of Double, turn all columns into one column named features.\n",
    "Hence, you will transform all _numeric_ columns into one Vector.\n",
    "\n",
    "Leave the `description` out as it is not relevant for your next task.\n",
    "    \n",
    "For creating `features` column we use Vector Assembler\n",
    "``` python\n",
    "from pyspark.ml.feature import VectorAssembler\n",
    "vecAssembler = VectorAssembler(inputCols=[\"a\", \"b\", \"c\"], outputCol=\"features\",handleInvalid = \"skip\")\n",
    "new_df = vecAssembler.transform(dataFrame)\n",
    "new_df.show()\n",
    "\n",
    "```\n",
    "<details><summary><b>Did you know!</b></summary>\n",
    "<p>\n",
    "\n",
    "Vector has two types in Spark:\n",
    "    \n",
    "    1. Dense Vector\n",
    "    2. Sparse Vector\n",
    "    \n",
    "_Sparse vector_ is when you have many values in the vector as zero or null.\n",
    "\n",
    "_Dense vector_ is when most of the values in the vector are non zero or non-null.\n",
    "\n",
    "</p>\n",
    "</details>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "\n",
    "🤔 **Question**\n",
    "\n",
    "Have you noticed `handleInvalid` param?  \n",
    "```python\n",
    "handleInvalid = \"skip\"\n",
    "```\n",
    "\n",
    "\n",
    "\n",
    "\n",
    "What happens if you remove it?\n",
    "\n",
    "Validate yourself with `vecAssembler.show()`\n",
    "\n",
    "Notice! You have a new DataFrame now. \n",
    "\n",
    "Remember to check yourself and work with the new DataFrame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# your code goes here"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "---"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "✅ **Task :** \n",
    "\n",
    "To run ML training phase on the scalar vector you need to create DataFrame out of it.\n",
    "\n",
    "`bot` and `features` are the **only** columns that we care about.\n",
    "\n",
    "Do it with drop function:\n",
    "    \n",
    "```python\n",
    "\n",
    "output = df_train.drop(\"val1\",\"val2\")\n",
    "```\n",
    "\n",
    "\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "# run this:\n",
    "output_train = df_train.drop('screen_name','location','followers_count','friends_count','listed_count','favourites_count','verified','statuses_count','status','default_profile','name')\n",
    "output_train.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "After turning _numeric_ columns into one `features` column and dropping `description`:\n",
    "\n",
    "We got left with creating `label` column:"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create a new DataFrame with `label` column:\n",
    "Code sample:\n",
    "```python\n",
    "df_for_lr  = output_train.selectExpr(\"features\", \"bot as label\")\n",
    "df_for_lr.show()\n",
    "\n",
    "df_for_lr.toPandas().transpose()\n",
    "```\n",
    "\n",
    "\n",
    "Notice that now `df_for_lr` is your new DataFrame for creating `LinearRegression` model"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# your code goes here"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Run the next code.\n",
    "Check out where you use the new DataFrame - `df_for_lr`\n",
    "\n",
    "It creates a machine learning model out of linear regression.\n",
    "\n",
    "Tweak the `maxIter`,`regParam` and `elasticNetParam` !"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.ml.regression import LinearRegression\n",
    "\n",
    "\n",
    "lr = LinearRegression(maxIter=10, regParam=0.3, elasticNetParam=0.8)\n",
    "\n",
    "# Fit the model\n",
    "lrModel = lr.fit(df_for_lr)\n",
    "\n",
    "# Print the coefficients and intercept for linear regression\n",
    "print(\"Coefficients: %s\" % str(lrModel.coefficients))\n",
    "print(\"Intercept: %s\" % str(lrModel.intercept))\n",
    "\n",
    "# Summarize the model over the training set and print out some metrics\n",
    "trainingSummary = lrModel.summary\n",
    "print(\"numIterations: %d\" % trainingSummary.totalIterations)\n",
    "print(\"objectiveHistory: %s\" % str(trainingSummary.objectiveHistory))\n",
    "trainingSummary.residuals.show()\n",
    "print(\"RMSE: %f\" % trainingSummary.rootMeanSquaredError)\n",
    "print(\"r2: %f\" % trainingSummary.r2)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "What did you get?\n",
    "\n",
    "How does it look like?\n",
    "\n",
    "What is `r2`? `r2` is a shortcut for R Square: \n",
    ">R-squared is a statistical measure of how close the data are to the fitted regression line. It is also known as the coefficient of determination, or the coefficient of multiple determination for multiple regression.\n",
    "\n",
    "What is RMSE?\n",
    "> Root Mean Square Error (RMSE) is the standard deviation of the residuals (prediction errors). Residuals are a measure of how far from the regression line data points are; RMSE is a measure of how spread out these residuals are. In other words, it tells you how concentrated the data is around the line of best fit.\n",
    "\n",
    "Try to play with the parameters and watch how they change."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "RMSE alone is meaningless until we compare with the actual `label` value, such as mean, min and max. \n",
    "After such comparison, our RMSE looks pretty good.\n",
    "\n",
    "Compare `RMSE` and `mean` output.\n",
    "After such comparison, our RMSE looks pretty good.\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_for_lr.describe().show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "You built 2 machine learning models!\n",
    "\n",
    "However, didn't get the best results.\n",
    "\n",
    "It's OK!\n",
    "\n",
    "And absolutly normal.\n",
    "\n",
    "In chapter 4 you learn how to improve it."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### To load the models later, Save them to file:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "lrModel.save(\"linearRegression_model\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "fpGrowth_model.save(\"fpGrowth_model\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Well Done! 👏👏👏\n",
    "## You just finished: Apache Spark ML and created machine learning models¶\n",
    "## Next Chapter: Evaluating ML models and using pipelines "
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
